java背压不适用于groupBy运算符
我试图对我的代码施加反压力,但它不起作用。 我尝试使用给定的示例代码here它看起来很有效
Flux.range(1,100)
.doOnNext(d->getLogger().info("receive record ::: {}",d))
.flatMap(recordFlux -> Mono.delay(Duration.ofSeconds(30))
.doOnNext(d->getLogger().info("processed message :: {}",recordFlux))
.then(Mono.just(recordFlux))
,1
)
.subscribe();
这是我得到的结果
2021-11-02 15:34:25.678 INFO 7456 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 1
2021-11-02 15:34:55.682 INFO 7456 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 1
2021-11-02 15:34:55.684 INFO 7456 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 2
2021-11-02 15:35:25.685 INFO 7456 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 2
2021-11-02 15:35:25.686 INFO 7456 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 3
2021-11-02 15:35:55.687 INFO 7456 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 3
2021-11-02 15:35:55.687 INFO 7456 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 4
2021-11-02 15:36:25.690 INFO 7456 --- [ parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 4
2021-11-02 15:36:25.691 INFO 7456 --- [ parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 5
2021-11-02 15:36:55.697 INFO 7456 --- [ parallel-5] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 5
2021-11-02 15:36:55.698 INFO 7456 --- [ parallel-5] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 6
2021-11-02 15:37:25.704 INFO 7456 --- [ parallel-6] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 6
2021-11-02 15:37:25.704 INFO 7456 --- [ parallel-6] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 7
2021-11-02 15:37:55.714 INFO 7456 --- [ parallel-7] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 7
2021-11-02 15:37:55.714 INFO 7456 --- [ parallel-7] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 8
2021-11-02 15:38:25.720 INFO 7456 --- [ parallel-8] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 8
2021-11-02 15:38:25.720 INFO 7456 --- [ parallel-8] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 9
2021-11-02 15:38:55.723 INFO 7456 --- [ parallel-9] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 9
2021-11-02 15:38:55.723 INFO 7456 --- [ parallel-9] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 10
2021-11-02 15:39:25.726 INFO 7456 --- [ parallel-10] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 10
但当我在flatmap上添加额外的groupBy时,它就不起作用了。添加groupBy的效果如何?如何使用下面的代码实现上述结果
Flux.range(1,100)
.doOnNext(d->getLogger().info("receive record ::: {}",d))
.groupBy(m->1 )
.flatMap(consumerRecordFlux -> consumerRecordFlux
.doOnNext(a -> getLogger().info("before process message :: partition ::{}, record ::{}",consumerRecordFlux.key(),a))
.flatMap(b-> Mono.delay(Duration.ofSeconds(30))
.doOnNext(d->getLogger().info("processed message :: {}",b))
.then(Mono.just(b))
,1,1
)
,1,1
)
.subscribe();
这是错误的输入,我得到了上述代码
2021-11-02 15:51:44.827 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 1
2021-11-02 15:51:44.829 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::1
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 2
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 3
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 4
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 5
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 6
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 7
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 8
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 9
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 10
2021-11-02 15:51:44.833 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 11
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 12
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 13
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 14
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 15
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 16
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 17
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 18
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 19
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 20
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 21
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 22
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 23
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 24
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 25
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 26
2021-11-02 15:51:44.834 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 27
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 28
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 29
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 30
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 31
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 32
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 33
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 34
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 35
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 36
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 37
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 38
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 39
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 40
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 41
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 42
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 43
2021-11-02 15:51:44.835 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 44
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 45
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 46
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 47
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 48
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 49
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 50
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 51
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 52
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 53
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 54
2021-11-02 15:51:44.836 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 55
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 56
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 57
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 58
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 59
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 60
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 61
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 62
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 63
2021-11-02 15:51:44.837 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 64
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 65
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 66
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 67
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 68
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 69
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 70
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 71
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 72
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 73
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 74
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 75
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 76
2021-11-02 15:51:44.838 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 77
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 78
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 79
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 80
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 81
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 82
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 83
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 84
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 85
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 86
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 87
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 88
2021-11-02 15:51:44.839 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 89
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 90
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 91
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 92
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 93
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 94
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 95
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 96
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 97
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 98
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 99
2021-11-02 15:51:44.840 INFO 8071 --- [ main] c.k.l.ConductorIndexUpdatesKafkaConsumer : receive record ::: 100
2021-11-02 15:52:14.836 INFO 8071 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 1
2021-11-02 15:52:14.837 INFO 8071 --- [ parallel-1] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::2
2021-11-02 15:52:44.843 INFO 8071 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 2
2021-11-02 15:52:44.844 INFO 8071 --- [ parallel-2] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::3
2021-11-02 15:53:14.846 INFO 8071 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 3
2021-11-02 15:53:14.847 INFO 8071 --- [ parallel-3] c.k.l.ConductorIndexUpdatesKafkaConsumer : before process message :: partition ::1, record ::4
2021-11-02 15:53:44.852 INFO 8071 --- [ parallel-4] c.k.l.ConductorIndexUpdatesKafkaConsumer : processed message :: 4
共 (0) 个答案